package com.atguigu.edu.app.dwd.db;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.util.KafkaUtil;
import com.google.inject.internal.util.$Strings;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

public class DwdInteractionCourceComment {
    public static void main(String[] args) throws Exception {
        //TODO 1 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //TODO　2 设置状态后端
                 /*
                 env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE );
                 env.getCheckpointConfig().setCheckpointTimeout( 3 * 60 * 1000L );
                 env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
                 env.setStateBackend(new HashMapStateBackend());
                 env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall/ck");
                 System.setProperty("HADOOP_USER_NAME", "atguigu");
                  */

        //TODO 3 读取kafka topic_db数据
        String topicName = "topic_db";
        String groupId = "dwd_interaction_cource_comment";
        DataStreamSource<String> dbStream = env.addSource(KafkaUtil.getKafkaConsumer(topicName, groupId));

        //TODO 4 过滤出课程评价表
        SingleOutputStreamOperator<String> jsonObjStream =
                dbStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                String table = jsonObject.getString("table");
                String type = jsonObject.getString("type");
                if ("review_info".equals(table) && "insert".equals(type)) {
                    out.collect(jsonObject.toString());
                }
            }
        });
        //TODO 5 写出到kafka新的主题
        jsonObjStream.addSink(KafkaUtil.getKafkaProducer(groupId));

        //TODO 6 执行代码
        env.execute(groupId);
    }
}
